package org.kie.kogito.quarkus.outbox;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import io.restassured.RestAssured;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;

/* loaded from: input_file:org/kie/kogito/quarkus/outbox/OutboxIT.class */
public class OutboxIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(OutboxIT.class);
    private static final Duration TIMEOUT = Duration.ofMinutes(1);
    private static final Duration INTERVAL = Duration.ofSeconds(1);
    private static final String PROCESS_EVENTS_TOPIC = "kogito-processinstances-events";
    private static final String USERTASK_EVENTS_TOPIC = "kogito-usertaskinstances-events";
    private static final int KOGITO_PORT = 8080;
    private static final int KAFKA_PORT = 9092;
    private static final int DEBEZIUM_PORT = 8083;
    private static DockerComposeContainer COMPOSE;
    private int kogitoPort;
    private int debeziumPort;
    private int kafkaPort;
    private KafkaTestClient kafkaClient;

    @BeforeAll
    static void before() {
        Path path = Paths.get("../../docker-compose.yml", new String[0]);
        if (!path.toFile().exists()) {
            path = Paths.get("docker-compose.yml", new String[0]);
        }
        COMPOSE = new DockerComposeContainer(new File[]{path.toFile()});
        COMPOSE.withPull(false);
        COMPOSE.withExposedService("kogito", KOGITO_PORT);
        COMPOSE.withExposedService("kafka", KAFKA_PORT);
        COMPOSE.withExposedService("connect", DEBEZIUM_PORT);
        COMPOSE.withLogConsumer("kafka", logger());
        COMPOSE.withLogConsumer("connect", logger());
        COMPOSE.withLogConsumer("sidecar", logger());
        COMPOSE.withLogConsumer("kogito", logger());
        COMPOSE.waitingFor("kafka", Wait.forListeningPort());
        COMPOSE.waitingFor("sidecar", Wait.forListeningPort());
        COMPOSE.waitingFor("kogito", Wait.forListeningPort());
        COMPOSE.start();
    }

    @AfterAll
    static void after() {
        if (COMPOSE != null) {
            COMPOSE.stop();
        }
    }

    private static Consumer<OutputFrame> logger() {
        return new Slf4jLogConsumer(LOGGER);
    }

    @BeforeEach
    void setup() {
        this.kogitoPort = COMPOSE.getServicePort("kogito", Integer.valueOf(KOGITO_PORT)).intValue();
        this.debeziumPort = COMPOSE.getServicePort("connect", Integer.valueOf(DEBEZIUM_PORT)).intValue();
        this.kafkaPort = COMPOSE.getServicePort("kafka", Integer.valueOf(KAFKA_PORT)).intValue();
        this.kafkaClient = new KafkaTestClient("localhost:" + this.kafkaPort);
    }

    @AfterEach
    void close() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
    }

    @Test
    public void testSendProcessEvents() throws InterruptedException {
        Awaitility.await().ignoreExceptions().atMost(TIMEOUT).with().pollInterval(INTERVAL).untilAsserted(() -> {
            RestAssured.given().port(this.kogitoPort).when().get("/orders", new Object[0]).then().statusCode(200);
        });
        Awaitility.await().ignoreExceptions().atMost(TIMEOUT).with().pollInterval(INTERVAL).untilAsserted(() -> {
            RestAssured.given().port(this.debeziumPort).pathParam("connector", "kogito-connector").pathParam("task", 0).when().get("/connectors/{connector}/tasks/{task}/status", new Object[0]).then().statusCode(200).body("state", Matchers.equalTo("RUNNING"), new Object[0]);
        });
        Awaitility.await().atMost(TIMEOUT).with().pollInterval(INTERVAL).untilAsserted(() -> {
            RestAssured.given().port(this.debeziumPort).pathParam("connector", "kogito-connector").when().get("/connectors/{connector}/topics", new Object[0]).then().statusCode(200).body("kogito-connector.topics", Matchers.hasSize(0), new Object[0]);
        });
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.kafkaClient.consume(Set.of(PROCESS_EVENTS_TOPIC, USERTASK_EVENTS_TOPIC), str -> {
            String str = (String) JsonPath.read(str, "$.type", new Predicate[0]);
            if ("ProcessInstanceEvent".equals(str)) {
                String str2 = (String) JsonPath.read(str, "$.data.variables.order.orderNumber", new Predicate[0]);
                boolean booleanValue = ((Boolean) JsonPath.read(str, "$.data.variables.order.shipped", new Predicate[0])).booleanValue();
                if (!"23570".equals(str2) || booleanValue) {
                    return;
                }
                countDownLatch.countDown();
                return;
            }
            if ("UserTaskInstanceEvent".equals(str)) {
                String str3 = (String) JsonPath.read(str, "$.data.inputs.input1.orderNumber", new Predicate[0]);
                boolean booleanValue2 = ((Boolean) JsonPath.read(str, "$.data.inputs.input1.shipped", new Predicate[0])).booleanValue();
                if (!"23570".equals(str3) || booleanValue2) {
                    return;
                }
                countDownLatch2.countDown();
            }
        });
        RestAssured.given().port(this.kogitoPort).header("Content-Type", "application/json", new Object[0]).body("{\"approver\" : \"john\", \"order\" : {\"orderNumber\" : \"23570\", \"shipped\" : false}}").when().post("/orders", new Object[0]).then().statusCode(201).body("approver", Matchers.equalTo("john"), new Object[0]).body("order.orderNumber", Matchers.equalTo("23570"), new Object[0]).body("order.shipped", Matchers.equalTo(false), new Object[0]);
        Awaitility.await().atMost(TIMEOUT).with().pollInterval(INTERVAL).untilAsserted(() -> {
            RestAssured.given().port(this.debeziumPort).pathParam("connector", "kogito-connector").when().get("/connectors/{connector}/topics", new Object[0]).then().statusCode(200).body("kogito-connector.topics", Matchers.hasSize(2), new Object[0]).body("kogito-connector.topics", Matchers.hasItems(new String[]{PROCESS_EVENTS_TOPIC, USERTASK_EVENTS_TOPIC}), new Object[0]);
        });
        Assertions.assertTrue(countDownLatch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));
    }
}
